package com.xiaofan.apitest.sink

import com.xiaofan.apitest.source.SensorReading
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper

import java.util.Properties


/**
 * 消费数据： bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.23:9091,192.168.1.24:9091,192.168.1.25:9091 --from-beginning --topic sensor_sink
 *
 * 删除topic: bin/kafka-topics.sh --delete --zookeeper 192.168.1.23:2181,192.168.1.24:2181,192.168.1.25:2181 --topic sensor
 */
object KafkaSinkTest {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val properties = new Properties();
    properties.setProperty("bootstrap.servers", "192.168.1.23:9091,192.168.1.24:9091,192.168.1.25:9091")

    // 从kafka读取数据
    val inputStream: DataStream[String] = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))

    // 进行处理
    val dataStream: DataStream[String] = inputStream.map(
      data => {
        val arr: Array[String] = data.split(",")
        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble).toString
      }
    )

    // 写入kafka
    val myProducer: FlinkKafkaProducer011[String] = new FlinkKafkaProducer011[String]("sensor_sink", new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), properties, FlinkKafkaProducer011.Semantic.EXACTLY_ONCE)
    dataStream.addSink(myProducer).name("kafkaSink")

    env.execute("kafka sink test")
  }
}
